08 流式输出
上一篇文章我们学习了事件流stream_events,它是LangChain v1.3之后推荐的新API。而stream方法是更底层的流式输出API,它提供了三种不同的流模式,可以灵活组合使用。
简单来说,stream_events是一个"高级包装",stream是"底层原语"。如果你是新项目,官方推荐用stream_events;但stream的灵活性更强,有些场景下它更合适。
一、三种流模式
stream方法支持三种流模式:
| 模式 | 用途 |
|---|---|
updates | 每个Agent步骤后输出状态更新,比如模型调用了什么工具、返回了什么结果 |
messages | 流式输出LLM生成的token,实现"打字机"效果 |
custom | 自定义数据,可以在工具内部输出任意进度信息 |
这三种模式可以单独使用,也可以组合使用。
二、Agent进度输出
使用stream_mode="updates"可以获取Agent每一步的状态变化。比如一个简单的天气查询,会经历三步:
- 模型节点:LLM决定调用
get_weather工具 - 工具节点:执行
get_weather工具,返回结果 - 模型节点:LLM根据工具结果生成最终回复
from langchain.agents import create_agent
def get_weather(city: str) -> str:
"""查询城市天气"""
return f"{city}今天晴天,气温28°C~35°C"
agent = create_agent(model="deepseek-v4-flash", tools=[get_weather])
for chunk in agent.stream(
{"messages": [{"role": "user", "content": "杭州天气怎么样?"}]},
stream_mode="updates",
version="v2",
):
print(f"类型: {chunk['type']}")
print(f"数据: {chunk['data']}")
print()输出效果类似:
类型: updates
数据: {'model': {'messages': [AIMessage(content='', tool_calls=[{'name': 'get_weather', 'args': {'city': '杭州'}, ...}])]}}
类型: updates
数据: {'tools': {'messages': [ToolMessage(content='杭州今天晴天,气温28°C~35°C', ...)]}}
类型: updates
数据: {'model': {'messages': [AIMessage(content='杭州今天天气晴朗,气温28°C到35°C,适合出门。')]}每个chunk是一个字典,type表示流模式,data是具体的数据内容。
三、LLM Tokens流式输出
使用stream_mode="messages"可以逐token输出模型的回复,实现"打字机"效果:
from langchain.agents import create_agent
def get_weather(city: str) -> str:
"""查询城市天气"""
return f"{city}今天晴天,气温28°C~35°C"
agent = create_agent(model="deepseek-v4-flash", tools=[get_weather])
for chunk in agent.stream(
{"messages": [{"role": "user", "content": "杭州天气怎么样?"}]},
stream_mode="messages",
version="v2",
):
if chunk["type"] == "messages":
token, metadata = chunk["data"]
# 输出token内容
print(f"节点: {metadata['langgraph_node']}")
print(f"内容: {token.content_blocks}")
print()输出效果类似:
节点: model
内容: [{'type': 'tool_call_chunk', 'name': 'get_weather', 'args': '', ...}]
节点: model
内容: [{'type': 'tool_call_chunk', 'name': None, 'args': '{"city', ...}]
节点: tools
内容: [{'type': 'text', 'text': '杭州今天晴天,气温28°C~35°C'}]
节点: model
内容: [{'type': 'text', 'text': '杭州'}]
节点: model
内容: [{'type': 'text', 'text': '今天'}]
节点: model
内容: [{'type': 'text', 'text': '天气晴朗'}]每个chunk是一个(token, metadata)元组,token包含内容,metadata包含节点名称等信息。
四、流式输出推理过程
有些模型支持"深度思考"模式,比如DeepSeek。使用stream_mode="messages"并过滤reasoning类型的内容块,可以流式输出模型的推理过程:
from langchain.agents import create_agent
def get_weather(city: str) -> str:
"""查询城市天气"""
return f"{city}今天晴天,气温28°C~35°C"
agent = create_agent(model="deepseek-v4-flash", tools=[get_weather])
for chunk in agent.stream(
{"messages": [{"role": "user", "content": "杭州天气怎么样?"}]},
stream_mode="messages",
version="v2",
):
if chunk["type"] == "messages":
token, metadata = chunk["data"]
for block in token.content_blocks:
if block["type"] == "reasoning":
print(f"[思考] {block['text']}", end="", flush=True)
elif block["type"] == "text":
print(block["text"], end="", flush=True)输出效果类似:
[思考] 用户问杭州天气,我需要调用get_weather工具...
杭州今天晴天,气温28°C~35°C,适合出门。不同模型的推理格式不一样(Anthropic叫thinking块,OpenAI叫reasoning摘要),但LangChain统一把它们标准化成了reasoning类型,你不用关心底层差异。
五、自定义数据流式输出
有些工具执行时间很长,比如处理大文件、调用外部API等,你可能想实时输出处理进度。这时候可以用get_stream_writer()在工具内部输出自定义数据:
import time
from langchain.agents import create_agent
from langgraph.config import get_stream_writer
def generate_report(topic: str) -> str:
"""生成研究报告"""
writer = get_stream_writer()
writer(f"正在收集{topic}相关资料...")
time.sleep(2)
writer(f"正在分析数据...")
time.sleep(2)
writer(f"正在撰写报告...")
time.sleep(2)
writer(f"报告生成完成!")
return f"《{topic}研究报告》已生成"
agent = create_agent(model="deepseek-v4-flash", tools=[generate_report])
for chunk in agent.stream(
{"messages": [{"role": "user", "content": "生成一份AI行业研究报告"}]},
stream_mode="custom",
version="v2",
):
if chunk["type"] == "custom":
print(chunk["data"])输出效果:
正在收集AI行业相关资料...
正在分析数据...
正在撰写报告...
报告生成完成!注意: 使用
get_stream_writer()的工具只能在LangGraph执行环境中调用,单独调用会报错。
六、多模式组合使用
在实际项目中,你往往同时需要多种流模式。比如既想看Agent的进度更新,又想看自定义的进度信息。可以把stream_mode传一个列表:
from langchain.agents import create_agent
from langgraph.config import get_stream_writer
def get_weather(city: str) -> str:
"""查询城市天气"""
writer = get_stream_writer()
writer(f"正在查询{city}的天气数据...")
return f"{city}今天晴天,气温28°C~35°C"
agent = create_agent(model="deepseek-v4-flash", tools=[get_weather])
for chunk in agent.stream(
{"messages": [{"role": "user", "content": "杭州天气怎么样?"}]},
stream_mode=["updates", "custom"],
version="v2",
):
print(f"模式: {chunk['type']}")
print(f"数据: {chunk['data']}")
print()输出效果:
模式: custom
数据: 正在查询杭州的天气数据...
模式: updates
数据: {'model': {'messages': [AIMessage(...)]}}
模式: updates
数据: {'tools': {'messages': [ToolMessage(...)]}}
模式: updates
数据: {'model': {'messages': [AIMessage(...)]}}七、流式输出工具调用
当LLM生成工具调用时,参数是逐步生成的(比如{"city": "杭州"}是分多个token生成的)。使用stream_mode="messages"可以拿到这些增量:
from langchain.agents import create_agent
from langchain.messages import AIMessageChunk
def get_weather(city: str) -> str:
"""查询城市天气"""
return f"{city}今天晴天,气温28°C~35°C"
agent = create_agent(model="deepseek-v4-flash", tools=[get_weather])
for chunk in agent.stream(
{"messages": [{"role": "user", "content": "杭州天气怎么样?"}]},
stream_mode=["messages", "updates"],
version="v2",
):
if chunk["type"] == "messages":
token, metadata = chunk["data"]
if isinstance(token, AIMessageChunk):
# 输出文本增量
if token.text:
print(token.text, end="|", flush=True)
# 输出工具调用参数增量
if token.tool_call_chunks:
print(token.tool_call_chunks)
elif chunk["type"] == "updates":
for source, update in chunk["data"].items():
if source == "tools":
# 工具执行完成,输出结果
print(f"工具结果: {update['messages'][-1].content_blocks}")输出效果类似:
[{'name': 'get_weather', 'args': '', 'id': 'call_xxx', 'index': 0, 'type': 'tool_call_chunk'}]
[{'name': None, 'args': '{"', 'id': None, 'index': 0, 'type': 'tool_call_chunk'}]
[{'name': None, 'args': 'city', 'id': None, 'index': 0, 'type': 'tool_call_chunk'}]
[{'name': None, 'args': '":"', 'id': None, 'index': 0, 'type': 'tool_call_chunk'}]
[{'name': None, 'args': '杭州', 'id': None, 'index': 0, 'type': 'tool_call_chunk'}]
[{'name': None, 'args': '"}', 'id': None, 'index': 0, 'type': 'tool_call_chunk'}]
工具结果: [{'type': 'text', 'text': '杭州今天晴天,气温28°C~35°C'}]
杭州|今天|天气晴朗|,|气温|28°C|~|35°C|,|适合|出门|。|八、子Agent的流式输出
在多Agent协作的场景中,你需要区分token是哪个Agent输出的。创建Agent时给它起个name,然后在流式输出中通过lc_agent_name来判断:
from langchain.agents import create_agent
def get_weather(city: str) -> str:
"""查询城市天气"""
return f"{city}今天晴天,气温28°C~35°C"
# 创建天气Agent
weather_agent = create_agent(
model="deepseek-v4-flash",
tools=[get_weather],
name="weather_agent",
)
def call_weather(query: str) -> str:
"""调用天气Agent"""
result = weather_agent.invoke({"messages": [{"role": "user", "content": query}]})
return result["messages"][-1].text
# 创建主Agent
agent = create_agent(
model="deepseek-v4-flash",
tools=[call_weather],
name="supervisor",
)
current_agent = None
for chunk in agent.stream(
{"messages": [{"role": "user", "content": "杭州天气怎么样?"}]},
stream_mode=["messages", "updates"],
subgraphs=True,
version="v2",
):
if chunk["type"] == "messages":
token, metadata = chunk["data"]
# 通过lc_agent_name判断当前是哪个Agent
if agent_name := metadata.get("lc_agent_name"):
if agent_name != current_agent:
print(f"\n🤖 {agent_name}: ")
current_agent = agent_name
if token.text:
print(token.text, end="|", flush=True)
elif chunk["type"] == "updates":
for source, update in chunk["data"].items():
if source == "tools":
print(f"\n工具结果: {update['messages'][-1].content_blocks}")输出效果类似:
🤖 supervisor:
[工具调用参数...]
🤖 weather_agent:
[工具调用参数...]
工具结果: [{'type': 'text', 'text': '杭州今天晴天,气温28°C~35°C'}]
杭州|今天|天气晴朗|,|适合|出门|。|
🤖 supervisor:
杭州|天气|查询|结果|:|晴天|,|28°C|~|35°C|。|九、禁用流式输出
有些场景下你不需要流式输出,比如后端批量处理任务。可以在创建模型时设置streaming=False:
from langchain_openai import ChatOpenAI
model = ChatOpenAI(model="gpt-5.4", streaming=False)如果模型不支持streaming参数,可以用disable_streaming=True代替,这个参数在所有模型上都可用。
十、总结
stream方法提供了三种流模式,各有各的用途:
updates:看Agent的执行步骤,适合调试和展示进度messages:逐token输出,适合前端"打字机"效果custom:自定义输出,适合长时间任务的进度通知
三种模式可以组合使用,通过chunk["type"]来区分数据来源。
如果你是新项目,官方更推荐使用stream_eventsAPI(上一篇文章介绍的),它的类型化投影更方便使用。但stream的灵活性更强,理解它能帮你更好地掌握LangChain的流式机制。